package com.flow.framework.core.system.thread.pool.executor;

import com.flow.framework.common.error.SystemErrorCode;
import com.flow.framework.common.exception.CheckedException;
import com.flow.framework.common.util.verify.VerifyUtil;
import com.flow.framework.core.system.checker.ThreadPoolStatusChecker;
import com.flow.framework.core.system.thread.pool.factory.DefaultThreadFactory;
import com.flow.framework.core.system.thread.pool.policy.RejectedPolicy;
import com.flow.framework.core.system.thread.pool.task.BaseCallable;
import com.flow.framework.core.system.thread.pool.task.BaseRunnable;
import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.TimeUnit;

/**
 * 线程池适配，主要是提供trace id的穿透和任务超时检查等特性
 *
 * @author luoguopiao
 * @version 0.0.1
 * @date 2022/1/23
 */
@Slf4j
public class ThreadPoolExecutor {

    /**
     * 缓存优先被使用的拒绝策略
     */
    private static final ThreadLocal<CustomizationRejectedHandler> PRIORITY_REJECTED_EXECUTION_HANDLER_THREAD_LOCAL = new ThreadLocal<>();

    /**
     * 实际线程池
     */
    private final java.util.concurrent.ThreadPoolExecutor executor;

    /**
     * 线程池状态检查器
     */
    private final ThreadPoolStatusChecker threadPoolStatusChecker;

    /**
     * 默认的自定义拒绝策略，在线程池创建时根据传入的拒绝策略创建的自定义决绝策略处理器
     */
    private final CustomizationRejectedHandler defaultPolicyHandler;

    /**
     * 构造器 通常来讲web程序大部分属于io密集型（网络、磁盘等），除非用于数字计算
     *
     * @param corePoolSize     核心线程数，可以理解为最少线程数
     * @param maximumPoolSize  最多线程数
     * @param keepAliveTime    除去核心线程以外的线程在空闲状态下最多维持多久被回收
     * @param workQueueSize    线程池队列大小
     * @param threadNamePrefix 线程池中的线程名称前缀
     * @param policy           线程池中任务满了之后提交文物的处理策略
     */
    public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, int workQueueSize,
                              String threadNamePrefix, RejectedPolicy policy) {
        if (VerifyUtil.hasEmpty(threadNamePrefix, policy)) {
            throw new CheckedException(SystemErrorCode.PARAMS_ERROR);
        }
        this.threadPoolStatusChecker = new com.flow.framework.core.system.checker.impl.ThreadPoolStatusChecker();
        this.defaultPolicyHandler = CustomizationRejectedHandlerHolder.getCustomizationHandler(policy);
        DefaultThreadFactory defaultThreadFactory;
        if (!VerifyUtil.isEmpty(threadNamePrefix)) {
            defaultThreadFactory = new DefaultThreadFactory(
                    threadNamePrefix.endsWith("-") ? threadNamePrefix : threadNamePrefix + "-"
            );
        } else {
            defaultThreadFactory = new DefaultThreadFactory(threadNamePrefix);
        }
        executor = new java.util.concurrent.ThreadPoolExecutor(corePoolSize, maximumPoolSize,
                keepAliveTime, TimeUnit.SECONDS, new LinkedBlockingDeque<Runnable>(workQueueSize),
                defaultThreadFactory, new ProtoRejectedExecutionHandler());
        SystemStatusCheckerExecutor.schedule(threadPoolStatusChecker);
    }

    /**
     * 提交任务
     *
     * @param task 任务
     * @return future
     */
    public Future<?> submit(BaseRunnable task) {
        Future<?> future = executor.submit(task);
        threadPoolStatusChecker.onSubmit(future, task);
        return future;
    }

    /**
     * 提交任务
     *
     * @param task 任务
     * @param rejectedPolicy 本地任务提交所使用的拒绝策略
     * @return future
     */
    public Future<?> submit(BaseRunnable task, RejectedPolicy rejectedPolicy) {
        CustomizationRejectedHandler customizationHandler = CustomizationRejectedHandlerHolder.getCustomizationHandler(rejectedPolicy);
        PRIORITY_REJECTED_EXECUTION_HANDLER_THREAD_LOCAL.set(customizationHandler);
        Future<?> future = executor.submit(task);
        threadPoolStatusChecker.onSubmit(future, task);
        return future;
    }

    /**
     * 提交任务
     *
     * @param task 任务
     * @param <T> T
     * @return future
     */
    public <T> Future<T> submit(BaseCallable<T> task) {
        Future<T> future = executor.submit(task);
        threadPoolStatusChecker.onSubmit(future, task);
        return future;
    }

    /**
     * 提交任务
     *
     * @param task 任务
     * @param rejectedPolicy 本地任务提交所使用的拒绝策略
     * @param <T> T
     * @return future
     */
    public <T> Future<T> submit(BaseCallable<T> task, RejectedPolicy rejectedPolicy) {
        CustomizationRejectedHandler customizationHandler = CustomizationRejectedHandlerHolder.getCustomizationHandler(rejectedPolicy);
        PRIORITY_REJECTED_EXECUTION_HANDLER_THREAD_LOCAL.set(customizationHandler);
        Future<T> future = executor.submit(task);
        threadPoolStatusChecker.onSubmit(future, task);
        return future;
    }

    /**
     * 关闭线程池
     */
    public void shutdown() {
        if (!executor.isShutdown()) {
            try {
                executor.shutdown();
            } catch (Exception e) {
                log.error("shutdown thread pool error.", e);
            }
        }

        try {
            SystemStatusCheckerExecutor.stopListener(threadPoolStatusChecker);
        } catch (Exception e) {
            log.error("shutdown thread pool listener error.", e);
        }
    }

    /**
     * 实际线程池所使用的拒绝处理器
     */
    private class ProtoRejectedExecutionHandler implements RejectedExecutionHandler {

        @Override
        public void rejectedExecution(Runnable r, java.util.concurrent.ThreadPoolExecutor executor) {
            try {
                defaultPolicyHandler.rejectedExecution(r, executor, threadPoolStatusChecker,
                        PRIORITY_REJECTED_EXECUTION_HANDLER_THREAD_LOCAL.get());
            } finally {
                PRIORITY_REJECTED_EXECUTION_HANDLER_THREAD_LOCAL.remove();
            }
        }
    }
}
